1   package org.apache.lucene.codecs.ramonly;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one or more
5    * contributor license agreements.  See the NOTICE file distributed with
6    * this work for additional information regarding copyright ownership.
7    * The ASF licenses this file to You under the Apache License, Version 2.0
8    * (the "License"); you may not use this file except in compliance with
9    * the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  
20  import java.io.IOException;
21  import java.util.ArrayList;
22  import java.util.Collection;
23  import java.util.Collections;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.SortedMap;
29  import java.util.TreeMap;
30  import java.util.concurrent.atomic.AtomicInteger;
31  
32  import org.apache.lucene.codecs.CodecUtil;
33  import org.apache.lucene.codecs.FieldsConsumer;
34  import org.apache.lucene.codecs.FieldsProducer;
35  import org.apache.lucene.codecs.PostingsFormat;
36  import org.apache.lucene.codecs.TermStats;
37  import org.apache.lucene.index.PostingsEnum;
38  import org.apache.lucene.index.FieldInfo;
39  import org.apache.lucene.index.Fields;
40  import org.apache.lucene.index.IndexFileNames;
41  import org.apache.lucene.index.IndexOptions;
42  import org.apache.lucene.index.SegmentReadState;
43  import org.apache.lucene.index.SegmentWriteState;
44  import org.apache.lucene.index.Terms;
45  import org.apache.lucene.index.TermsEnum;
46  import org.apache.lucene.store.IndexInput;
47  import org.apache.lucene.store.IndexOutput;
48  import org.apache.lucene.util.Accountable;
49  import org.apache.lucene.util.Accountables;
50  import org.apache.lucene.util.BytesRef;
51  import org.apache.lucene.util.FixedBitSet;
52  import org.apache.lucene.util.IOUtils;
53  import org.apache.lucene.util.RamUsageEstimator;
54  
55  /** Stores all postings data in RAM, but writes a small
56   *  token (header + single int) to identify which "slot" the
57   *  index is using in RAM HashMap.
58   *
59   *  NOTE: this codec sorts terms by reverse-unicode-order! */
60  
61  public final class RAMOnlyPostingsFormat extends PostingsFormat {
62  
63    public RAMOnlyPostingsFormat() {
64      super("RAMOnly");
65    }
66      
67    // Postings state:
68    static class RAMPostings extends FieldsProducer {
69      final Map<String,RAMField> fieldToTerms = new TreeMap<>();
70  
71      @Override
72      public Terms terms(String field) {
73        return fieldToTerms.get(field);
74      }
75  
76      @Override
77      public int size() {
78        return fieldToTerms.size();
79      }
80  
81      @Override
82      public Iterator<String> iterator() {
83        return Collections.unmodifiableSet(fieldToTerms.keySet()).iterator();
84      }
85  
86      @Override
87      public void close() {
88      }
89  
90      @Override
91      public long ramBytesUsed() {
92        long sizeInBytes = 0;
93        for(RAMField field : fieldToTerms.values()) {
94          sizeInBytes += field.ramBytesUsed();
95        }
96        return sizeInBytes;
97      }
98      
99      @Override
100     public Collection<Accountable> getChildResources() {
101       return Accountables.namedAccountables("field", fieldToTerms);
102     }
103 
104     @Override
105     public void checkIntegrity() throws IOException {}
106   } 
107 
108   static class RAMField extends Terms implements Accountable {
109     final String field;
110     final SortedMap<String,RAMTerm> termToDocs = new TreeMap<>();
111     long sumTotalTermFreq;
112     long sumDocFreq;
113     int docCount;
114     final FieldInfo info;
115 
116     RAMField(String field, FieldInfo info) {
117       this.field = field;
118       this.info = info;
119     }
120 
121     @Override
122     public long ramBytesUsed() {
123       long sizeInBytes = 0;
124       for(RAMTerm term : termToDocs.values()) {
125         sizeInBytes += term.ramBytesUsed();
126       }
127       return sizeInBytes;
128     }
129 
130     @Override
131     public Collection<Accountable> getChildResources() {
132       return Collections.emptyList();
133     }
134 
135     @Override
136     public long size() {
137       return termToDocs.size();
138     }
139 
140     @Override
141     public long getSumTotalTermFreq() {
142       return sumTotalTermFreq;
143     }
144 
145     @Override
146     public long getSumDocFreq() throws IOException {
147       return sumDocFreq;
148     }
149       
150     @Override
151     public int getDocCount() throws IOException {
152       return docCount;
153     }
154 
155     @Override
156     public TermsEnum iterator() {
157       return new RAMTermsEnum(RAMOnlyPostingsFormat.RAMField.this);
158     }
159 
160     @Override
161     public boolean hasFreqs() {
162       return info.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
163     }
164 
165     @Override
166     public boolean hasOffsets() {
167       return info.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;
168     }
169 
170     @Override
171     public boolean hasPositions() {
172       return info.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
173     }
174     
175     @Override
176     public boolean hasPayloads() {
177       return info.hasPayloads();
178     }
179   }
180 
181   static class RAMTerm implements Accountable {
182     final String term;
183     long totalTermFreq;
184     final List<RAMDoc> docs = new ArrayList<>();
185     public RAMTerm(String term) {
186       this.term = term;
187     }
188 
189     @Override
190     public long ramBytesUsed() {
191       long sizeInBytes = 0;
192       for(RAMDoc rDoc : docs) {
193         sizeInBytes += rDoc.ramBytesUsed();
194       }
195       return sizeInBytes;
196     }
197 
198     @Override
199     public Collection<Accountable> getChildResources() {
200       return Collections.emptyList();
201     }
202   }
203 
204   static class RAMDoc implements Accountable {
205     final int docID;
206     final int[] positions;
207     byte[][] payloads;
208 
209     public RAMDoc(int docID, int freq) {
210       this.docID = docID;
211       positions = new int[freq];
212     }
213 
214     @Override
215     public long ramBytesUsed() {
216       long sizeInBytes = 0;
217       sizeInBytes +=  (positions!=null) ? RamUsageEstimator.sizeOf(positions) : 0;
218       
219       if (payloads != null) {
220         for(byte[] payload: payloads) {
221           sizeInBytes += (payload!=null) ? RamUsageEstimator.sizeOf(payload) : 0;
222         }
223       }
224       return sizeInBytes;
225     }
226     
227     @Override
228     public Collection<Accountable> getChildResources() {
229       return Collections.emptyList();
230     }
231   }
232 
233   // Classes for writing to the postings state
234   private static class RAMFieldsConsumer extends FieldsConsumer {
235 
236     private final RAMPostings postings;
237     private final RAMTermsConsumer termsConsumer = new RAMTermsConsumer();
238     private final SegmentWriteState state;
239 
240     public RAMFieldsConsumer(SegmentWriteState writeState, RAMPostings postings) {
241       this.postings = postings;
242       this.state = writeState;
243     }
244 
245     @Override
246     public void write(Fields fields) throws IOException {
247       for(String field : fields) {
248 
249         Terms terms = fields.terms(field);
250         if (terms == null) {
251           continue;
252         }
253 
254         TermsEnum termsEnum = terms.iterator();
255 
256         FieldInfo fieldInfo = state.fieldInfos.fieldInfo(field);
257         if (fieldInfo.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0) {
258           throw new UnsupportedOperationException("this codec cannot index offsets");
259         }
260 
261         RAMField ramField = new RAMField(field, fieldInfo);
262         postings.fieldToTerms.put(field, ramField);
263         termsConsumer.reset(ramField);
264 
265         FixedBitSet docsSeen = new FixedBitSet(state.segmentInfo.maxDoc());
266         long sumTotalTermFreq = 0;
267         long sumDocFreq = 0;
268         PostingsEnum postingsEnum = null;
269         int enumFlags;
270 
271         IndexOptions indexOptions = fieldInfo.getIndexOptions();
272         boolean writeFreqs = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS) >= 0;
273         boolean writePositions = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0;
274         boolean writeOffsets = indexOptions.compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0;        
275         boolean writePayloads = fieldInfo.hasPayloads();
276 
277         if (writeFreqs == false) {
278           enumFlags = 0;
279         } else if (writePositions == false) {
280           enumFlags = PostingsEnum.FREQS;
281         } else if (writeOffsets == false) {
282           if (writePayloads) {
283             enumFlags = PostingsEnum.PAYLOADS;
284           } else {
285             enumFlags = 0;
286           }
287         } else {
288           if (writePayloads) {
289             enumFlags = PostingsEnum.PAYLOADS | PostingsEnum.OFFSETS;
290           } else {
291             enumFlags = PostingsEnum.OFFSETS;
292           }
293         }
294 
295         while (true) {
296           BytesRef term = termsEnum.next();
297           if (term == null) {
298             break;
299           }
300           RAMPostingsWriterImpl postingsWriter = termsConsumer.startTerm(term);
301           postingsEnum = termsEnum.postings(postingsEnum, enumFlags);
302 
303           int docFreq = 0;
304           long totalTermFreq = 0;
305           while (true) {
306             int docID = postingsEnum.nextDoc();
307             if (docID == PostingsEnum.NO_MORE_DOCS) {
308               break;
309             }
310             docsSeen.set(docID);
311             docFreq++;
312 
313             int freq;
314             if (writeFreqs) {
315               freq = postingsEnum.freq();
316               totalTermFreq += freq;
317             } else {
318               freq = -1;
319             }
320 
321             postingsWriter.startDoc(docID, freq);
322             if (writePositions) {
323               for (int i=0;i<freq;i++) {
324                 int pos = postingsEnum.nextPosition();
325                 BytesRef payload = writePayloads ? postingsEnum.getPayload() : null;
326                 int startOffset;
327                 int endOffset;
328                 if (writeOffsets) {
329                   startOffset = postingsEnum.startOffset();
330                   endOffset = postingsEnum.endOffset();
331                 } else {
332                   startOffset = -1;
333                   endOffset = -1;
334                 }
335                 postingsWriter.addPosition(pos, payload, startOffset, endOffset);
336               }
337             }
338 
339             postingsWriter.finishDoc();
340           }
341           termsConsumer.finishTerm(term, new TermStats(docFreq, totalTermFreq));
342           sumDocFreq += docFreq;
343           sumTotalTermFreq += totalTermFreq;
344         }
345 
346         termsConsumer.finish(sumTotalTermFreq, sumDocFreq, docsSeen.cardinality());
347       }
348     }
349 
350     @Override
351     public void close() throws IOException {
352     }
353   }
354 
355   private static class RAMTermsConsumer {
356     private RAMField field;
357     private final RAMPostingsWriterImpl postingsWriter = new RAMPostingsWriterImpl();
358     RAMTerm current;
359       
360     void reset(RAMField field) {
361       this.field = field;
362     }
363       
364     public RAMPostingsWriterImpl startTerm(BytesRef text) {
365       final String term = text.utf8ToString();
366       current = new RAMTerm(term);
367       postingsWriter.reset(current);
368       return postingsWriter;
369     }
370 
371     public void finishTerm(BytesRef text, TermStats stats) {
372       assert stats.docFreq > 0;
373       assert stats.docFreq == current.docs.size();
374       current.totalTermFreq = stats.totalTermFreq;
375       field.termToDocs.put(current.term, current);
376     }
377 
378     public void finish(long sumTotalTermFreq, long sumDocFreq, int docCount) {
379       field.sumTotalTermFreq = sumTotalTermFreq;
380       field.sumDocFreq = sumDocFreq;
381       field.docCount = docCount;
382     }
383   }
384 
385   static class RAMPostingsWriterImpl {
386     private RAMTerm term;
387     private RAMDoc current;
388     private int posUpto = 0;
389 
390     public void reset(RAMTerm term) {
391       this.term = term;
392     }
393 
394     public void startDoc(int docID, int freq) {
395       current = new RAMDoc(docID, freq);
396       term.docs.add(current);
397       posUpto = 0;
398     }
399 
400     public void addPosition(int position, BytesRef payload, int startOffset, int endOffset) {
401       assert startOffset == -1;
402       assert endOffset == -1;
403       current.positions[posUpto] = position;
404       if (payload != null && payload.length > 0) {
405         if (current.payloads == null) {
406           current.payloads = new byte[current.positions.length][];
407         }
408         byte[] bytes = current.payloads[posUpto] = new byte[payload.length];
409         System.arraycopy(payload.bytes, payload.offset, bytes, 0, payload.length);
410       }
411       posUpto++;
412     }
413 
414     public void finishDoc() {
415       assert posUpto == current.positions.length;
416     }
417   }
418 
419   static class RAMTermsEnum extends TermsEnum {
420     Iterator<String> it;
421     String current;
422     private final RAMField ramField;
423 
424     public RAMTermsEnum(RAMField field) {
425       this.ramField = field;
426     }
427       
428     @Override
429     public BytesRef next() {
430       if (it == null) {
431         if (current == null) {
432           it = ramField.termToDocs.keySet().iterator();
433         } else {
434           it = ramField.termToDocs.tailMap(current).keySet().iterator();
435         }
436       }
437       if (it.hasNext()) {
438         current = it.next();
439         return new BytesRef(current);
440       } else {
441         return null;
442       }
443     }
444 
445     @Override
446     public SeekStatus seekCeil(BytesRef term) {
447       current = term.utf8ToString();
448       it = null;
449       if (ramField.termToDocs.containsKey(current)) {
450         return SeekStatus.FOUND;
451       } else {
452         if (current.compareTo(ramField.termToDocs.lastKey()) > 0) {
453           return SeekStatus.END;
454         } else {
455           return SeekStatus.NOT_FOUND;
456         }
457       }
458     }
459 
460     @Override
461     public void seekExact(long ord) {
462       throw new UnsupportedOperationException();
463     }
464 
465     @Override
466     public long ord() {
467       throw new UnsupportedOperationException();
468     }
469 
470     @Override
471     public BytesRef term() {
472       // TODO: reuse BytesRef
473       return new BytesRef(current);
474     }
475 
476     @Override
477     public int docFreq() {
478       return ramField.termToDocs.get(current).docs.size();
479     }
480 
481     @Override
482     public long totalTermFreq() {
483       return ramField.termToDocs.get(current).totalTermFreq;
484     }
485 
486     @Override
487     public PostingsEnum postings(PostingsEnum reuse, int flags) {
488       return new RAMDocsEnum(ramField.termToDocs.get(current));
489     }
490 
491   }
492 
493   private static class RAMDocsEnum extends PostingsEnum {
494     private final RAMTerm ramTerm;
495     private RAMDoc current;
496     int upto = -1;
497     int posUpto = 0;
498 
499     public RAMDocsEnum(RAMTerm ramTerm) {
500       this.ramTerm = ramTerm;
501     }
502 
503     @Override
504     public int advance(int targetDocID) throws IOException {
505       return slowAdvance(targetDocID);
506     }
507 
508     // TODO: override bulk read, for better perf
509     @Override
510     public int nextDoc() {
511       upto++;
512       if (upto < ramTerm.docs.size()) {
513         current = ramTerm.docs.get(upto);
514         posUpto = 0;
515         return current.docID;
516       } else {
517         return NO_MORE_DOCS;
518       }
519     }
520 
521     @Override
522     public int freq() throws IOException {
523       return current.positions.length;
524     }
525 
526     @Override
527     public int docID() {
528       return current.docID;
529     }
530 
531     @Override
532     public int nextPosition() {
533       assert posUpto < current.positions.length;
534       return current.positions[posUpto++];
535     }
536 
537     @Override
538     public int startOffset() {
539       return -1;
540     }
541 
542     @Override
543     public int endOffset() {
544       return -1;
545     }
546 
547     @Override
548     public BytesRef getPayload() {
549       if (current.payloads != null && current.payloads[posUpto-1] != null) {
550         return new BytesRef(current.payloads[posUpto-1]);
551       } else {
552         return null;
553       }
554     }
555     
556     @Override
557     public long cost() {
558       return ramTerm.docs.size();
559     } 
560   }
561 
562   // Holds all indexes created, keyed by the ID assigned in fieldsConsumer
563   private final Map<Integer,RAMPostings> state = new HashMap<>();
564 
565   private final AtomicInteger nextID = new AtomicInteger();
566 
567   private final String RAM_ONLY_NAME = "RAMOnly";
568   private final static int VERSION_START = 0;
569   private final static int VERSION_LATEST = VERSION_START;
570 
571   private static final String ID_EXTENSION = "id";
572 
573   @Override
574   public FieldsConsumer fieldsConsumer(SegmentWriteState writeState) throws IOException {
575     final int id = nextID.getAndIncrement();
576 
577     // TODO -- ok to do this up front instead of
578     // on close....?  should be ok?
579     // Write our ID:
580     final String idFileName = IndexFileNames.segmentFileName(writeState.segmentInfo.name, writeState.segmentSuffix, ID_EXTENSION);
581     IndexOutput out = writeState.directory.createOutput(idFileName, writeState.context);
582     boolean success = false;
583     try {
584       CodecUtil.writeHeader(out, RAM_ONLY_NAME, VERSION_LATEST);
585       out.writeVInt(id);
586       success = true;
587     } finally {
588       if (!success) {
589         IOUtils.closeWhileHandlingException(out);
590       } else {
591         IOUtils.close(out);
592       }
593     }
594     
595     final RAMPostings postings = new RAMPostings();
596     final RAMFieldsConsumer consumer = new RAMFieldsConsumer(writeState, postings);
597 
598     synchronized(state) {
599       state.put(id, postings);
600     }
601     return consumer;
602   }
603 
604   @Override
605   public FieldsProducer fieldsProducer(SegmentReadState readState)
606     throws IOException {
607 
608     // Load our ID:
609     final String idFileName = IndexFileNames.segmentFileName(readState.segmentInfo.name, readState.segmentSuffix, ID_EXTENSION);
610     IndexInput in = readState.directory.openInput(idFileName, readState.context);
611     boolean success = false;
612     final int id;
613     try {
614       CodecUtil.checkHeader(in, RAM_ONLY_NAME, VERSION_START, VERSION_LATEST);
615       id = in.readVInt();
616       success = true;
617     } finally {
618       if (!success) {
619         IOUtils.closeWhileHandlingException(in);
620       } else {
621         IOUtils.close(in);
622       }
623     }
624     
625     synchronized(state) {
626       return state.get(id);
627     }
628   }
629 }